Skip to content

Serialize engine config in new pdsh benchmark CLI#22365

Open
TomAugspurger wants to merge 13 commits intorapidsai:mainfrom
TomAugspurger:tom/serialize-engine-config
Open

Serialize engine config in new pdsh benchmark CLI#22365
TomAugspurger wants to merge 13 commits intorapidsai:mainfrom
TomAugspurger:tom/serialize-engine-config

Conversation

@TomAugspurger
Copy link
Copy Markdown
Contributor

Description

This updates the cudf-polars benchmarks CLI to serialize the engine configuration. This will let us see what options were actually used.

This updates the cudf-polars benchmarks CLI to serialize the engine
configuration. This will let us see what options were *actually* used.
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 4, 2026

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@github-actions github-actions Bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels May 4, 2026
@GPUtester GPUtester moved this to In Progress in cuDF Python May 4, 2026
@TomAugspurger TomAugspurger marked this pull request as ready for review May 4, 2026 16:15
@TomAugspurger TomAugspurger requested a review from a team as a code owner May 4, 2026 16:15
@TomAugspurger TomAugspurger requested a review from rjzamora May 4, 2026 16:15
@TomAugspurger TomAugspurger added bug Something isn't working non-breaking Non-breaking change labels May 4, 2026
This adds back a `--spill-to-pinned-memory` CLI option and threads it
through for the new frontends.
@TomAugspurger
Copy link
Copy Markdown
Contributor Author

This needs more work. StreamingExecutor.spill_to_pinned_memory doesn't actually have any effect on pinned memory with the new frontends. That's controlled through the rapidsmpf_options["pinned_memory"] option / --pinned-memory CLI.

We need to ensure that the config_options contains, in some way, the rapidsmpf_options.

@TomAugspurger
Copy link
Copy Markdown
Contributor Author

I've updated this to serialize config_options differently for the new StreamingEngine frontends to account for rapidsmpf_options being a level above ConfigOptions`. Now, we'll have a dict

{
   "config_options": ...,  # the old `config_options`, i.e. `dataclasses.asdict(cudf_polars.utils.config.ConfigOptions)`
   "rapidsmpf_options": # rapidsmpf.config.Options.get_strings()
}

And here are some example values of .rapidsmpf_options:

# dask
{
  "num_streaming_threads": "4"
}
# ray
{
  "num_streaming_threads": "4"
}
# spmd
{
  "spill_device_limit": "",
  "allow_overbooking_by_default": "",
  "log": "",
  "periodic_spill_check": "",
  "pinned_memory": "",
  "num_streaming_threads": "",
  "statistics": "",
  "memory_reserve_timeout": "",
  "num_streams": "",
  "unbounded_file_read_cache": ""
}

Unfortunately, I don't think rapidsmpf currently has a way to view the actual values the options resolve to without explicitly setting or using them, which is why the dask/ray examples only have the num_streaming_threads while the spmd has more. And now that I look at it, I have no idea why those are set to empty strings. I'll need to investigate that...

@TomAugspurger
Copy link
Copy Markdown
Contributor Author

@madsbk do you have any ideas about how to handle #22365 (comment)? Specifically,

  1. A way for a rapidsmpf.config.Options() object to resolve to the actual defaults used in rapidsmpf
  2. Why the values in rapidsmpf_options.get_strings() are all "" for the SPMD engine?

@madsbk
Copy link
Copy Markdown
Member

madsbk commented May 5, 2026

  1. A way for a rapidsmpf.config.Options() object to resolve to the actual defaults used in rapidsmpf

I don’t think it is possible with the current design, where each call site parses the string value and defines its own default.

  1. Why the values in rapidsmpf_options.get_strings() are all "" for the SPMD engine?

An empty string means that no one has accessed and parsed that option yet.

@TomAugspurger
Copy link
Copy Markdown
Contributor Author

OK, I think this might be all we can do on the cudf-polars side then. I've opened rapidsai/rapidsmpf#1014 to track what I think needs to happen on the rapidsmpf side, and then hopefully we can get back to a state where all of the actual runtime options are being recorded.

For now, I'll merge this so that the non-rapidsmpf options are being recored properly.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 8, 2026

Review Change Stack

📝 Walkthrough

Summary by CodeRabbit

  • Improvements

    • Enhanced benchmark result serialization to include engine-specific RapidsMPF configuration details in distributed execution scenarios.
  • Refactor

    • Reorganized distributed engine configuration storage to persist resolved options on instances for consistent access during initialization.
    • Updated benchmark finalization to properly pass engine context during serialization.

Walkthrough

This PR persists RapidsMPF options as instance attributes on streaming engine backends (SPMD, Ray, Dask), updates benchmark serialization to accept StreamingEngine and augment output with engine-derived configuration, and wires the active engine through finalization call sites instead of always serializing with engine=None.

Changes

RapidsMPF Options Persistence and Serialization

Layer / File(s) Summary
Type Declaration and Imports
python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py
StreamingEngine declares rapidsmpf_options: rapidsmpf.config.Options attribute type annotation; rapidsmpf.config is explicitly imported.
Engine Instance Initialization
python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/spmd.py, python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py, python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py
SPMD, Ray, and Dask engines now resolve RapidsMPF options once, store them on self.rapidsmpf_options, and reference the instance attribute throughout initialization instead of using inline serialization or local variables.
Benchmark Serialization Update
python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py
RunConfig.serialize now accepts StreamingEngine | None parameter; when engine is provided, executor context objects are stripped and RapidsMPF options are extracted and augmented into the serialized output.
Finalization Call Sites
python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py
_finalize_benchmark_run accepts engine parameter and passes it to serialization; SPMD, Ray, and Dask benchmark finalization paths now invoke _finalize_benchmark_run(..., engine=engine) to enable engine-aware serialization.

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Serialize engine config in new pdsh benchmark CLI' directly and clearly summarizes the main change: serializing engine configuration in the benchmarks CLI.
Description check ✅ Passed The description explains that the PR updates the benchmarks CLI to serialize engine configuration so developers can see what options were actually used, which directly relates to the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`:
- Around line 597-601: The serialized RapidsMPF options are stale because
serialize() reads engine.rapidsmpf_options while the frontend _reset() path
rebuilds the live Context from new Options without updating that cached
attribute; fix by ensuring the source of truth is consistent — either update
self.rapidsmpf_options inside every frontend _reset() (the methods that rebuild
Context/Options) to reflect the newly resolved Options, or change serialize() to
read/serialize from the same freshly resolved Options object used in the reset
path (the local variable passed into Context rebuild) instead of
engine.rapidsmpf_options; reference methods: serialize(), _reset(),
engine.rapidsmpf_options, self.rapidsmpf_options, Context, Options.
- Around line 1201-1203: The call to _finalize_benchmark_run(...) is happening
after exiting the with RayEngine(...) block so engine shutdown has already
occurred; move the _finalize_benchmark_run(args, run_config,
validation_failures, query_failures, engine=engine) invocation back inside the
with RayEngine(...) context so RunConfig.serialize(engine=engine) (and any
engine-dependent recording) runs while the engine is still live and before
shutdown.
- Around line 1263-1265: The call to _finalize_benchmark_run(...) is happening
after the DaskEngine context has exited so the engine is already shut down when
run_config.serialize(engine=engine) needs it; move the
_finalize_benchmark_run(args, run_config, validation_failures, query_failures,
engine=engine) invocation so it executes inside the with DaskEngine(...) block
(i.e., before the context manager exits) so run_config.serialize can access the
live engine during Dask runs.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 4f40fd22-68af-45ff-b378-7e9bda8ca9e6

📥 Commits

Reviewing files that changed from the base of the PR and between 7d84936 and 0f2a37a.

📒 Files selected for processing (5)
  • python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/spmd.py

Comment on lines +597 to +601
rapidsmpf_options = engine.rapidsmpf_options.get_strings()
result["config_options"] = {
"config_options": dataclasses.asdict(config_options),
"rapidsmpf_options": rapidsmpf_options,
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Keep serialized RapidsMPF options in sync with engine resets.

serialize() now trusts engine.rapidsmpf_options, but the frontend _reset() paths rebuild their live Context from new Options objects without updating that cached attribute. After a reset, this JSON will report the old settings instead of the ones actually running. Please either refresh self.rapidsmpf_options in each _reset() or serialize from the same freshly resolved options object used by the reset path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 597 - 601, The serialized RapidsMPF options are stale because
serialize() reads engine.rapidsmpf_options while the frontend _reset() path
rebuilds the live Context from new Options without updating that cached
attribute; fix by ensuring the source of truth is consistent — either update
self.rapidsmpf_options inside every frontend _reset() (the methods that rebuild
Context/Options) to reflect the newly resolved Options, or change serialize() to
read/serialize from the same freshly resolved Options object used in the reset
path (the local variable passed into Context rebuild) instead of
engine.rapidsmpf_options; reference methods: serialize(), _reset(),
engine.rapidsmpf_options, self.rapidsmpf_options, Context, Options.

Comment on lines +1201 to +1203
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Serialize the Ray engine before leaving the context manager.

This now runs after with RayEngine(...) exits, so shutdown() has already torn down the engine state that RunConfig.serialize(engine=engine) reads. That means the new config recording can emit empty/default data or fail instead of capturing the live runtime settings. Move _finalize_benchmark_run(...) back inside the with block.

Suggested fix
     with RayEngine(
         rapidsmpf_options=run_config.streaming_options.to_rapidsmpf_options(),
         executor_options=executor_options,
         engine_options=engine_options,
         ray_init_options=ray_init_options,
     ) as engine:
         run_config = dataclasses.replace(run_config, n_workers=engine.nranks)
         records, plans, validation_failures, query_failures = _run_query_loop(
             benchmark,
             args,
             run_config,
             engine,
             numeric_type,
             date_type,
             validation_files,
         )
         run_config = dataclasses.replace(run_config, records=dict(records), plans=plans)
         run_config = _consolidate_logs(run_config, engine=engine)
-
-    _finalize_benchmark_run(
-        args, run_config, validation_failures, query_failures, engine=engine
-    )
+        _finalize_benchmark_run(
+            args, run_config, validation_failures, query_failures, engine=engine
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 1201 - 1203, The call to _finalize_benchmark_run(...) is happening
after exiting the with RayEngine(...) block so engine shutdown has already
occurred; move the _finalize_benchmark_run(args, run_config,
validation_failures, query_failures, engine=engine) invocation back inside the
with RayEngine(...) context so RunConfig.serialize(engine=engine) (and any
engine-dependent recording) runs while the engine is still live and before
shutdown.

Comment on lines +1263 to +1265
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Finalize the Dask run while the engine is still alive.

By the time this call executes, the with DaskEngine(...) block has already exited, so the engine has been shut down before run_config.serialize(engine=engine) reads from it. That defeats the new config-capture path for Dask runs. Call _finalize_benchmark_run(...) inside the with block instead.

Suggested fix
     try:
         with DaskEngine(
             rapidsmpf_options=run_config.streaming_options.to_rapidsmpf_options(),
             executor_options=executor_options,
             engine_options=engine_options,
             dask_client=dask_client,
         ) as engine:
             run_config = dataclasses.replace(run_config, n_workers=engine.nranks)
             records, plans, validation_failures, query_failures = _run_query_loop(
                 benchmark,
                 args,
                 run_config,
                 engine,
                 numeric_type,
                 date_type,
                 validation_files,
             )
             run_config = dataclasses.replace(
                 run_config, records=dict(records), plans=plans
             )
             run_config = _consolidate_logs(run_config, engine)
+            _finalize_benchmark_run(
+                args, run_config, validation_failures, query_failures, engine=engine
+            )
     finally:
         if dask_client is not None:
             dask_client.close()
-    _finalize_benchmark_run(
-        args, run_config, validation_failures, query_failures, engine=engine
-    )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 1263 - 1265, The call to _finalize_benchmark_run(...) is happening
after the DaskEngine context has exited so the engine is already shut down when
run_config.serialize(engine=engine) needs it; move the
_finalize_benchmark_run(args, run_config, validation_failures, query_failures,
engine=engine) invocation so it executes inside the with DaskEngine(...) block
(i.e., before the context manager exits) so run_config.serialize can access the
live engine during Dask runs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working cudf-polars Issues specific to cudf-polars non-breaking Non-breaking change Python Affects Python cuDF API.

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

4 participants